/*
 * MIT License
 *
 * Copyright (c) 2023 北京凯特伟业科技有限公司
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */
package com.je.workflow.service.push;

import com.alibaba.fastjson2.JSONObject;
import com.google.common.collect.Lists;
import com.je.common.base.DynaBean;
import com.je.common.base.message.vo.PushSystemMessage;
import com.je.common.base.service.MetaRbacService;
import com.je.message.rpc.DingTalkRpcService;
import com.je.message.rpc.EmailRpcService;
import com.je.message.rpc.NoteRpcService;
import com.je.message.rpc.SocketPushMessageRpcService;
import com.je.message.vo.dingTalk.DingTalkActionCardWorkNotice;
import com.je.message.vo.dingTalk.DingTalkMsg;
import com.je.workflow.service.push.pojo.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

@Component
public class RedisDelayTaskLister implements CommandLineRunner {
    Logger log = LoggerFactory.getLogger(RedisDelayTaskLister.class);

    /**
     * 延迟队列名称
     */
    private static final String DELAY_QUEUE_NAME = "delayQueue";

    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    SocketPushMessageRpcService socketPushMessageRpcService;

    @Autowired
    EmailRpcService emailRpcService;

    @Autowired
    DingTalkRpcService dingTalkRpcService;

    @Autowired
    NoteRpcService noteRpcService;

    @Autowired
    MetaRbacService metaRbacService;

    @Override
    public void run(String... args) throws Exception {
        Executors.newSingleThreadExecutor().submit(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    // 获取一个到点的消息
                    Set<String> set = redisTemplate.opsForZSet().rangeByScore(DELAY_QUEUE_NAME, 0, System.currentTimeMillis(), 0, 10);
                    //log.info("RedisDealyTaskLister set={},time={}", set, LocalDateTime.now());
                    // 如果没有，就等等
                    if (set.isEmpty()) {
                        try {
                            Thread.sleep(2500);//每秒轮训一次
                        } catch (InterruptedException e) {
                            //log.info("RedisDealyTaskLister run e={}", e);
                        }
                        // 继续执行
                        continue;
                    }
                    // 获取具体消息的key
                    List<DelayTask> delayTaskList = new ArrayList<>();
                    for (String msg : set) {
                        //删除成功
                        if (redisTemplate.opsForZSet().remove(DELAY_QUEUE_NAME, msg) > 0) {
                            // 拿到任务
                            DelayTask delayTask = JSONObject.parseObject(msg, DelayTask.class);
                            //log.info("RedisDealyTaskLister delayTask={},time={}", delayTask.toString(), LocalDateTime.now());
                            delayTaskList.add(delayTask);
                        }
                    }
                    //分组、去重
                    List<DelayTask> newList = new ArrayList<>();
                    delayTaskList.stream().filter(distinctByKey(DelayTask::getMsg))  //filter保留true的值
                            .forEach(newList::add);

                    Map<String, List<DelayTask>> listMap = newList.stream().collect(Collectors.groupingBy(DelayTask::getMsgType));
                    try {
                        for (Map.Entry<String, List<DelayTask>> stringListEntry : listMap.entrySet()) {

                            if (PushMessageTypeEnum.SOCKET_PUSH_SYSTEM_MESSAGE.getName().equals(stringListEntry.getKey())) {
                                for (DelayTask delayTask : stringListEntry.getValue()) {
                                    //log.info("消息到期msg={},time={}", delayTask.toString(), LocalDateTime.now());
                                    CommonMessageVo msg = JSONObject.parseObject(delayTask.getMsg().toString(), CommonMessageVo.class);
                                    PushSystemMessage pushSystemMessage = new PushSystemMessage(msg.getPushType(), msg.getContent());
                                    pushSystemMessage.setTargetUserIds(Lists.newArrayList(msg.getToUserId()));
                                    pushSystemMessage.setSourceUserId(msg.getSourceUserId());
                                    socketPushMessageRpcService.sendMessage(pushSystemMessage);
                                }
                            } else if (PushMessageTypeEnum.SOCKET_PUSH_OPEN_FUNC_FORM_MSG.getName().equals(stringListEntry.getKey())) {
                                for (DelayTask delayTask : stringListEntry.getValue()) {
                                    //log.info("消息到期msg={},time={}", delayTask.toString(), LocalDateTime.now());
                                    CommonMessageVo msg = JSONObject.parseObject(delayTask.getMsg().toString(), CommonMessageVo.class);
                                    socketPushMessageRpcService.sendNoticeOpenFuncFormMsgToUser(msg.getToUserId(), msg.getFuncCode(), msg.getPushType(), msg.getNotice(), msg.getPkValue());
                                }
                            } else if (PushMessageTypeEnum.NOTE_PUSH_WITH_RECORD.getName().equals(stringListEntry.getKey())) {
                                for (DelayTask delayTask : stringListEntry.getValue()) {
                                    //log.info("消息到期msg={},time={}", delayTask.toString(), LocalDateTime.now());
                                    NoteMessageVo msg = JSONObject.parseObject(delayTask.getMsg().toString(), NoteMessageVo.class);
                                    noteRpcService.sendWithRecordSendUser(msg.getUserPhone(), msg.getSourceUserName(), msg.getSourceUserId(), msg.getToUserName(), msg.getToUserId(), msg.getContent());
                                }
                            } else if (PushMessageTypeEnum.EMAIL_PUSH_SEND.getName().equals(stringListEntry.getKey())) {
                                for (DelayTask delayTask : stringListEntry.getValue()) {
                                    //log.info("消息到期msg={},time={}", delayTask.toString(), LocalDateTime.now());
                                    EmailMessageVo msg = JSONObject.parseObject(delayTask.getMsg().toString(), EmailMessageVo.class);
                                    emailRpcService.sendEmail(msg.getUserEmail(), msg.getTitle(), msg.getContextType(), msg.getContent());
                                }
                            } else if (PushMessageTypeEnum.DING_TALK_PUSH_SEND.getName().equals(stringListEntry.getKey())) {
                                for (DelayTask delayTask : stringListEntry.getValue()) {
                                    //log.info("消息到期msg={},time={}", delayTask.toString(), LocalDateTime.now());
                                    DingTalkMessageVo msg = JSONObject.parseObject(delayTask.getMsg().toString(), DingTalkMessageVo.class);
                                    DingTalkMsg dingTalkMsg = new DingTalkMsg();
                                    dingTalkMsg.setJeCloudDingTalkId(msg.getJeCloudDingTalkId());
                                    List<String> toUserIds = new ArrayList<>();
                                    toUserIds.add(msg.getToUserId());
                                    DingTalkActionCardWorkNotice dingTalkActionCardWorkNotice = new DingTalkActionCardWorkNotice();
                                    dingTalkActionCardWorkNotice.setBtnOrientation("0");
                                    dingTalkActionCardWorkNotice.setContent(msg.getContent());
                                    dingTalkActionCardWorkNotice.setTitle(msg.getTitle());
                                    List<DingTalkActionCardWorkNotice.BtnInfo> btnInfoList = new ArrayList<>();
                                    DingTalkActionCardWorkNotice.BtnInfo btnInfo = new DingTalkActionCardWorkNotice.BtnInfo();
                                    btnInfo.setTitle("点击查看");
                                    StringBuffer url = new StringBuffer();
                                    DynaBean dynaBean = metaRbacService.selectOneByPk("JE_RBAC_DINGTALK_CONFIG", msg.getJeCloudDingTalkId());
                                    url.append(dynaBean.getStr("CONFIG_DOMAIN"));
                                    url.append("/je/rbac/dingtalk/redirectToDingTalk");
                                    url.append("?appId=" + dynaBean.getStr("CONFIG_APPID"));
                                    url.append("&funcCode=" + msg.getFuncCode());
                                    url.append("&pkValue=" + msg.getPkValue());
                                    url.append("&type=wf");
                                    log.info("-----------------------" + url.toString());
                                    btnInfo.setUrl(url.toString());
                                    btnInfoList.add(btnInfo);
                                    dingTalkActionCardWorkNotice.setBtnInfoList(btnInfoList);
                                    dingTalkMsg.setToUserIds(toUserIds);
                                    dingTalkRpcService.sendCardMsg(dingTalkMsg, dingTalkActionCardWorkNotice);
                                }
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });

    }

    static <T> Predicate<T> distinctByKey(Function<? super T, ?> keyExtractor) {
        Map<Object, Boolean> seen = new ConcurrentHashMap<>();
        //putIfAbsent方法添加键值对，如果map集合中没有该key对应的值，则直接添加，并返回null，如果已经存在对应的值，则依旧为原来的值。
        //如果返回null表示添加数据成功(不重复)，不重复(null==null :TRUE)
        return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
    }

}
